Previous slide Next slide Toggle fullscreen Open presenter view
Программирование сетевых приложений
Организация потоков, параллельной обработки, синхронизации и распределенной обработки синхронизуемых участков кода
Программирование сетевых приложений
Содержание лекции
Общее представление о потоках
Создание, остановка и соединение потоков
Планирование потоков и управление ими
Синхронизация потоков
Синхронизированные методы и блоки операторов
Тупики и методы их предотвращения
Коммуникация между потоками
Примеры на C++ и Qt
Организация потоков и синхронизация
Программирование сетевых приложений
Введение в многопоточность
Многопоточность - это механизм, позволяющий программе выполнять несколько операций одновременно. В современных приложениях потоки используются для повышения производительности, отзывчивости интерфейса и эффективного использования многоядерных процессоров.
Организация потоков и синхронизация
Программирование сетевых приложений
Основные понятия
Поток (thread) - наименьшая единица обработки, которую может запланировать операционная система
Процесс (process) - экземпляр выполняющейся программы с собственным адресным пространством
Параллелизм - одновременное выполнение нескольких вычислений
Синхронизация - координация выполнения потоков для предотвращения конфликтов
Организация потоков и синхронизация
Программирование сетевых приложений
Создание потоков в C++
#include <iostream>
#include <thread>
#include <chrono>
void workerFunction (int id) {
std::cout << "Поток " << id << " начал работу" << std::endl;
std::this_thread::sleep_for (std::chrono::seconds (1 ));
std::cout << "Поток " << id << " завершил работу" << std::endl;
}
int main () {
std::cout << "Главный поток: создание потоков" << std::endl;
std::thread thread1 (workerFunction, 1 ) ;
std::thread thread2 (workerFunction, 2 ) ;
thread1.join ();
thread2.join ();
std::cout << "Главный поток: все потоки завершены" << std::endl;
return 0 ;
}
Организация потоков и синхронизация
Программирование сетевых приложений
Потоки в Qt
#include <QThread>
#include <QDebug>
#include <QCoreApplication>
class Worker : public QObject {
Q_OBJECT
public slots:
void doWork () {
qDebug () << "Рабочий поток:" << QThread::currentThreadId ();
emit workFinished () ;
}
signals:
void workFinished () ;
};
class Controller : public QObject {
Q_OBJECT
public :
Controller () {
Worker* worker = new Worker;
QThread* workerThread = new QThread (this );
worker->moveToThread (workerThread);
connect (workerThread, &QThread::started, worker, &Worker::doWork);
connect (worker, &Worker::workFinished, workerThread, &QThread::quit);
connect (worker, &Worker::workFinished, worker, &Worker::deleteLater);
connect (workerThread, &QThread::finished, workerThread, &QThread::deleteLater);
workerThread->start ();
}
};
Организация потоков и синхронизация
Программирование сетевых приложений
Наследование от QThread
#include <QThread>
#include <QDebug>
class CustomThread : public QThread {
Q_OBJECT
protected :
void run () override {
qDebug () << "Поток начал выполнение:" << currentThreadId ();
for (int i = 0 ; i < 5 ; ++i) {
qDebug () << "Итерация" << i << "в потоке" << currentThreadId ();
msleep (1000 );
}
qDebug () << "Поток завершил выполнение:" << currentThreadId ();
}
};
int main (int argc, char *argv[]) {
QCoreApplication app (argc, argv) ;
CustomThread thread;
thread.start ();
thread.wait ();
return 0 ;
}
Организация потоков и синхронизация
Программирование сетевых приложений
Синхронизация потоков с использованием мьютексов
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
std::mutex mtx;
int sharedCounter = 0 ;
void incrementCounter (int iterations) {
for (int i = 0 ; i < iterations; ++i) {
mtx.lock ();
++sharedCounter;
mtx.unlock ();
}
}
int main () {
const int numThreads = 4 ;
const int iterationsPerThread = 100000 ;
std::vector<std::thread> threads;
for (int i = 0 ; i < numThreads; ++i) {
threads.emplace_back (incrementCounter, iterationsPerThread);
}
for (auto & t : threads) {
t.join ();
}
std::cout << "Итоговое значение счетчика: " << sharedCounter << std::endl;
std::cout << "Ожидаемое значение: " << numThreads * iterationsPerThread << std::endl;
return 0 ;
}
Организация потоков и синхронизация
Программирование сетевых приложений
Блокировка с использованием std::lock_guard
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
std::mutex mtx;
int sharedCounter = 0 ;
void incrementCounter (int iterations) {
for (int i = 0 ; i < iterations; ++i) {
std::lock_guard<std::mutex> lock (mtx) ;
++sharedCounter;
}
}
int main () {
const int numThreads = 4 ;
const int iterationsPerThread = 100000 ;
std::vector<std::thread> threads;
for (int i = 0 ; i < numThreads; ++i) {
threads.emplace_back (incrementCounter, iterationsPerThread);
}
for (auto & t : threads) {
t.join ();
}
std::cout << "Итоговое значение счетчика: " << sharedCounter << std::endl;
return 0 ;
}
Организация потоков и синхронизация
Программирование сетевых приложений
Синхронизация в Qt с использованием QMutex
#include <QCoreApplication>
#include <QThread>
#include <QMutex>
#include <QDebug>
#include <QVector>
class Counter : public QObject {
Q_OBJECT
private :
int value;
QMutex mutex;
public :
Counter () : value (0 ) {}
public slots:
void increment () {
mutex.lock ();
int temp = value;
QThread::msleep (1 );
value = temp + 1 ;
mutex.unlock ();
}
int getValue () const {
mutex.lock ();
int result = value;
mutex.unlock ();
return result;
}
};
class Worker : public QObject {
Q_OBJECT
private :
Counter* counter;
public :
Worker (Counter* c) : counter (c) {}
public slots:
void doWork () {
for (int i = 0 ; i < 1000 ; ++i) {
counter->increment ();
}
emit workFinished () ;
}
signals:
void workFinished () ;
};
Организация потоков и синхронизация
Программирование сетевых приложений
Атомарные операции
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
std::atomic<int > atomicCounter (0 ) ;
void incrementAtomic (int iterations) {
for (int i = 0 ; i < iterations; ++i) {
atomicCounter.fetch_add (1 , std::memory_order_relaxed);
}
}
int main () {
const int numThreads = 4 ;
const int iterationsPerThread = 100000 ;
std::vector<std::thread> threads;
for (int i = 0 ; i < numThreads; ++i) {
threads.emplace_back (incrementAtomic, iterationsPerThread);
}
for (auto & t : threads) {
t.join ();
}
std::cout << "Атомарный счетчик: " << atomicCounter.load () << std::endl;
return 0 ;
}
Организация потоков и синхронизация
Программирование сетевых приложений
Условные переменные
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex mtx;
std::condition_variable cv;
std::queue<int > dataQueue;
bool finished = false ;
void producer () {
for (int i = 0 ; i < 10 ; ++i) {
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
std::unique_lock<std::mutex> lock (mtx) ;
dataQueue.push (i);
std::cout << "Производитель: добавлен элемент " << i << std::endl;
lock.unlock ();
cv.notify_one ();
}
std::unique_lock<std::mutex> lock (mtx) ;
finished = true ;
lock.unlock ();
cv.notify_all ();
}
void consumer (int id) {
while (true ) {
std::unique_lock<std::mutex> lock (mtx) ;
cv.wait (lock, [] { return !dataQueue.empty () || finished; });
if (!dataQueue.empty ()) {
int data = dataQueue.front ();
dataQueue.pop ();
lock.unlock ();
std::cout << "Потребитель " << id << ": получен элемент " << data << std::endl;
} else if (finished) {
break ;
}
}
}
Организация потоков и синхронизация
Программирование сетевых приложений
Тупики (Deadlocks), несколько потоков ожидают друг друга:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mutex1;
std::mutex mutex2;
void threadFunction1 () {
std::lock_guard<std::mutex> lock1 (mutex1) ;
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
std::lock_guard<std::mutex> lock2 (mutex2) ;
std::cout << "Поток 1: получены оба мьютекса" << std::endl;
}
void threadFunction2 () {
std::lock_guard<std::mutex> lock2 (mutex2) ;
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
std::lock_guard<std::mutex> lock1 (mutex1) ;
std::cout << "Поток 2: получены оба мьютекса" << std::endl;
}
int main () {
std::thread t1 (threadFunction1) ;
std::thread t2 (threadFunction2) ;
t1.join ();
t2.join ();
return 0 ;
}
Организация потоков и синхронизация
Программирование сетевых приложений
Предотвращение тупиков с std::lock
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mutex1;
std::mutex mutex2;
void threadFunction1 () {
std::unique_lock<std::mutex> lock1 (mutex1, std::defer_lock) ;
std::unique_lock<std::mutex> lock2 (mutex2, std::defer_lock) ;
std::lock (lock1, lock2);
std::cout << "Поток 1: получены оба мьютекса" << std::endl;
}
void threadFunction2 () {
std::unique_lock<std::mutex> lock1 (mutex1, std::defer_lock) ;
std::unique_lock<std::mutex> lock2 (mutex2, std::defer_lock) ;
std::lock (lock1, lock2);
std::cout << "Поток 2: получены оба мьютекса" << std::endl;
}
int main () {
std::thread t1 (threadFunction1) ;
std::thread t2 (threadFunction2) ;
t1.join ();
t2.join ();
return 0 ;
}
Организация потоков и синхронизация
Программирование сетевых приложений
Коммуникация между потоками в Qt (сигналы и слоты)
#include <QCoreApplication>
#include <QThread>
#include <QDebug>
#include <QTimer>
class Worker : public QObject {
Q_OBJECT
private :
int counter;
public :
Worker () : counter (0 ) {}
public slots:
void process () {
++counter;
emit progress (counter) ;
if (counter >= 10 ) {
emit finished () ;
} else {
QTimer::singleShot (1000 , this , &Worker::process);
}
}
signals:
void progress (int value) ;
void finished () ;
};
class Controller : public QObject {
Q_OBJECT
private :
Worker* worker;
QThread* workerThread;
public :
Controller () {
worker = new Worker;
workerThread = new QThread (this );
worker->moveToThread (workerThread);
connect (workerThread, &QThread::started, worker, &Worker::process);
connect (worker, &Worker::progress, this , &Controller::onProgress);
connect (worker, &Worker::finished, this , &Controller::onFinished);
connect (worker, &Worker::finished, workerThread, &QThread::quit);
connect (workerThread, &QThread::finished, worker, &Worker::deleteLater);
connect (workerThread, &QThread::finished, workerThread, &QThread::deleteLater);
workerThread->start ();
}
public slots:
void onProgress (int value) {
qDebug () << "Прогресс:" << value;
}
void onFinished () {
qDebug () << "Работа завершена" ;
QCoreApplication::quit ();
}
};
Организация потоков и синхронизация
Программирование сетевых приложений
Потокобезопасные очереди
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <iostream>
template <typename T>
class ThreadSafeQueue {
private :
std::queue<T> queue;
mutable std::mutex mtx;
std::condition_variable cv;
public :
void push (const T& item) {
{
std::unique_lock<std::mutex> lock (mtx) ;
queue.push (item);
}
cv.notify_one ();
}
bool pop (T& item) {
std::unique_lock<std::mutex> lock (mtx) ;
cv.wait (lock, [this ] { return !queue.empty (); });
if (queue.empty ()) {
return false ;
}
item = queue.front ();
queue.pop ();
return true ;
}
bool try_pop (T& item) {
std::unique_lock<std::mutex> lock (mtx) ;
if (queue.empty ()) {
return false ;
}
item = queue.front ();
queue.pop ();
return true ;
}
size_t size () const {
std::unique_lock<std::mutex> lock (mtx) ;
return queue.size ();
}
};
Организация потоков и синхронизация
Программирование сетевых приложений
Планирование потоков
Планировщик ОС решает, какой поток выполняется на каком ядре CPU.
Два основных подхода:
Вытесняющее (preemptive) — ОС принудительно переключает потоки (Windows, Linux). Каждый поток получает квант времени (time slice).
Кооперативное (cooperative) — потоки отдают управление добровольно (ранние системы, некоторые встраиваемые).
Алгоритмы планирования:
Round-Robin (циклический) — потоки по очереди, фиксированный квант
Priority-based (по приоритетам) — поток с наивысшим приоритетом выполняется первым
CFS (Completely Fair Scheduler, Linux) — красно-чёрное дерево, справедливое распределение CPU
Факторы, влияющие на планирование: приоритет, CPU affinity, I/O-ожидание, nice value
Организация потоков и синхронизация
Программирование сетевых приложений
Приоритеты потоков в Qt
QThread::Priority: IdlePriority, LowestPriority, LowPriority, NormalPriority, HighPriority, HighestPriority, TimeCriticalPriority, InheritPriority
Приоритет потока — это рекомендация для ОС, а не гарантия.
Некорректное использование приоритетов может привести к starvation (голоданию) низкоприоритетных потоков.
#include <QThread>
class NetworkWorker : public QThread {
protected :
void run () override {
setPriority (QThread::HighPriority);
#ifdef Q_OS_LINUX
cpu_set_t cpuset;
CPU_ZERO (&cpuset);
CPU_SET (0 , &cpuset);
pthread_setaffinity_np ((pthread_t )handle (), sizeof (cpu_set_t ), &cpuset);
#endif
}
};
Организация потоков и синхронизация
Программирование сетевых приложений
Заключение
Многопоточность - это мощный инструмент, который требует тщательного проектирования и понимания. Ключевые принципы:
Минимизация общих ресурсов - чем меньше данных разделяется между потоками, тем проще обеспечить синхронизацию
Использование высокоуровневых абстракций - Qt предоставляет мощные механизмы для работы с потоками
Предотвращение тупиков - всегда блокировать мьютексы в одинаковом порядке
Правильное завершение потоков - использовать механизмы сигналов/слотов для безопасного завершения
Тестирование - многопоточные приложения требуют тщательного тестирования на наличие гонок и тупиков
Понимание этих концепций позволяет создавать эффективные и надежные многопоточные приложения на C++ с использованием Qt.
Организация потоков и синхронизация
Программирование сетевых приложений
Вопросы для самопроверки
Какие существуют способы создания потоков в C++?
Чем отличается std::thread от QThread?
Какие механизмы синхронизации существуют в C++?
Что такое тупик и как его предотвратить?
Как организовать коммуникацию между потоками в Qt?
Когда следует использовать атомарные операции вместо мьютексов?
Какие преимущества дает использование потокобезопасных контейнеров?
Организация потоков и синхронизация
Заметки докладчика:
- Многопоточность критически важна для сетевых серверов — без неё невозможно обслуживать нескольких клиентов одновременно.
- Эта лекция короче других — используйте оставшееся время для более глубокого обсуждения тем и живых демонстраций.
Заметки докладчика:
- std::thread запускается немедленно при создании объекта.
- Всегда вызывайте join() или detach() — иначе при уничтожении объекта thread будет вызван std::terminate и программа аварийно завершится.
- Используйте RAII-обёртку или std::jthread (C++20), который автоматически вызывает join() в деструкторе.
Заметки докладчика:
- КРИТИЧЕСКОЕ ПРАВИЛО Qt: QObject существует только в одном потоке. QTcpSocket нельзя использовать из потока, отличного от потока его родителя.
- Используйте moveToThread() для перемещения объектов между потоками.
- Сигналы/слоты между разными потоками автоматически используют QueuedConnection — это безопасно.
Заметки докладчика:
- Всегда держите мьютекс заблокированным минимальное время.
- НИКОГДА не выполняйте ввод-вывод под блокировкой — это убивает производительность сетевого сервера.
- Используйте std::lock_guard / std::unique_lock для исключительной безопасности (RAII).
- В Qt: QMutex совместно с QMutexLocker.
Заметки докладчика:
- condition_variable применяется для шаблона «производитель-потребитель» — особенно актуально при обработке сетевых пакетов.
- Всегда используйте с unique_lock, а не с lock_guard (lock_guard не поддерживает разблокировку).
- Возможны ложные пробуждения (spurious wakeups) — ВСЕГДА проверяйте условие в цикле, а не в if.
Заметки докладчика:
- Классическая задача «обедающих философов» — основной пример взаимной блокировки.
- Стратегии предотвращения: всегда блокировать мьютексы в одинаковом порядке, использовать std::lock (избегает дедлоков), применять std::scoped_lock (C++17).
- В сетевом программировании дедлок = сервер перестаёт отвечать на запросы клиентов.
Заметки докладчика:
- Планирование потоков — обязательный пункт учебной программы (тема 7).
- Для сетевых серверов: I/O-bound потоки (ожидание сети) должны иметь нормальный приоритет, CPU-bound (обработка) — ниже, чтобы не блокировать сеть.
- CFS (Completely Fair Scheduler) — алгоритм Linux, использует красно-чёрное дерево для учёта времени каждого потока.
- Приоритеты Qt — это обёртка над приоритетами ОС. QThread::InheritPriority означает наследование от создающего потока.
- На практике: не стоит злоупотреблять приоритетами. Лучше правильно проектировать архитектуру (thread pool, async I/O).
Заметки докладчика:
Ожидаемые ответы:
1. std::thread, QThread (наследование), QObject + moveToThread.
2. std::thread — низкоуровневый API стандартной библиотеки; QThread — интеграция с циклом событий Qt, сигналы/слоты.
3. Мьютексы (std::mutex, QMutex), атомарные операции (std::atomic), условные переменные (condition_variable), семафоры.
4. Тупик — ситуация, когда потоки бесконечно ожидают ресурсы друг друга. Предотвращение: единый порядок блокировки, std::lock, std::scoped_lock.
5. Сигналы и слоты (QueuedConnection между потоками), общие данные с мьютексами, QMetaObject::invokeMethod.
6. Когда операция простая и не требует защиты нескольких связанных переменных (например, счётчик). Атомарные операции быстрее мьютексов.
7. Избавляют от необходимости вручную синхронизировать доступ; снижают вероятность ошибок гонки данных.